Skip to content

MINOR: MM2 surface log-truncation and topic-reset as actionable events in MirrorSourceTask#22380

Open
rajabhishekmaurya wants to merge 3 commits into
apache:trunkfrom
rajabhishekmaurya:feature/kafka-mm2
Open

MINOR: MM2 surface log-truncation and topic-reset as actionable events in MirrorSourceTask#22380
rajabhishekmaurya wants to merge 3 commits into
apache:trunkfrom
rajabhishekmaurya:feature/kafka-mm2

Conversation

@rajabhishekmaurya
Copy link
Copy Markdown

Summary

MirrorSourceTask currently relies on the source consumer's
auto.offset.reset policy to recover from any OffsetOutOfRangeException.
In production that means the two most damaging real-world failure modes are
indistinguishable from a healthy stream:

  1. Log truncation — the source log's start offset advances past the
    replication position (retention policy, kafka-delete-records.sh, etc.).
    With auto.offset.reset=earliest the consumer silently jumps forward; the
    target cluster ends up with a gap that no metric or log line reflects.
  2. Topic reset — the source topic is deleted and recreated, e.g. during
    maintenance. The consumer's stored position is now beyond the new
    endOffset, so it either throws repeatedly or, again with earliest,
    silently jumps forward.

This change makes MirrorSourceTask recognise both situations and react
differently.

Changes

A new catch (OffsetOutOfRangeException e) branch in poll() dispatches to a
new helper handleOutOfRangeOffsets(...):

  • If position > endOffset && beginningOffset == 0 for an out-of-range
    partition, the source topic looks newly recreated. The task logs a WARN
    with the timestamp and topic-partition, calls consumer.seek(tp, 0L), and
    resumes replication on the next poll.
  • If position < beginningOffset, data we still needed to replicate has been
    purged. The task logs an ERROR describing the gap (positions and the
    number of records lost) and throws a ConnectException, surfacing the
    failure to Connect's task lifecycle (fail-fast) instead of papering over it.
  • Any other shape of out-of-range is treated as unexpected and fails the
    task, so the implementation degrades safely rather than silently.

initializeConsumer(...) was also tightened: partitions with no committed
offset now seekToBeginning(...) explicitly. This makes the task behave
correctly when operators choose consumer.auto.offset.reset = none to opt
into the truncation detection above (otherwise a first-time partition would
throw NoOffsetForPartitionException on first poll).

Net change: one file, ~70 added / ~7 removed.

Why the predicate is safe

The beginningOffset == 0 condition in the reset branch is load-bearing:
ordinary retention-driven trimming always leaves beginningOffset > 0, so
retention can never be mistaken for a topic reset — it always falls into the
truncation branch. Conversely, a recreated topic has both
beginningOffset == 0 and an endOffset lower than what we previously
read, so the recreation case has a clean discriminator.

Testing

  • Unit logic was exercised with a Docker Compose harness (two single-node
    KRaft clusters + this build of MM2 + a small producer). Three scenarios are
    run end-to-end:
    1. Normal replication of 1000 records — verified by reading the end offset
      of primary.commit-log on the target cluster.
    2. Truncation — pause MM2, produce additional records on the source, run
      kafka-delete-records.sh to lift the low watermark past MM2's last
      position, unpause. Asserts that
      "Source log truncation detected" appears in the task logs and that the
      task transitions to FAILED.
    3. Topic reset — stop MM2, delete and recreate the source topic, produce
      fresh records, restart MM2. Asserts that
      "Source topic reset detected" appears and that the new records arrive
      on the target.
  • All scenarios pass deterministically against apache/kafka:4.0.0.
  • Existing MirrorMaker 2 integration tests are unaffected (no public API
    changed; the new exception branch is the only execution path that
    behaviour-shifts and it only triggers on a state vanilla MM2 currently
    silently absorbs).

Compatibility

  • No public API change.
  • No new configuration property.
  • Behaviour change is gated on the consumer raising
    OffsetOutOfRangeException, which only happens when the operator sets
    consumer.auto.offset.reset = none. With the default
    (auto.offset.reset = earliest), MirrorSourceTask behaves exactly as
    before.

Committer Checklist (excluded from commit message)

  • Verified design and implementation
  • Verified test coverage and CI build status
  • Verified documentation (including upgrade notes) — N/A, no public surface change

shivdeep1 and others added 3 commits May 26, 2026 18:45
…ic reset handling

- Enhanced MirrorSourceTask with pre-flight and runtime boundary checks via handleOffsetBreach
- Added fatal exception throwing on log truncation to prevent silent data loss (Task 2)
- Added automatic consumer realignment to offset 0 upon administrative topic reset detection (Task 3)
- Created mm2.properties single-node cluster replication layout config
@github-actions github-actions Bot added triage PRs from the community connect mirror-maker-2 small Small PRs labels May 27, 2026
rajabhishekmaurya added a commit to rajabhishekmaurya/kafka-mirrormaker-challenge that referenced this pull request May 27, 2026
rajabhishekmaurya added a commit to rajabhishekmaurya/kafka-mirrormaker-challenge that referenced this pull request May 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants